home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyo (Python 2.5)
-
- from ZSI import _copyright, _children, _child_elements, _floattypes, _stringtypes, _seqtypes, _find_attr, _find_attrNS, _find_attrNodeNS, _find_arraytype, _find_default_namespace, _find_href, _find_encstyle, _resolve_prefix, _find_xsi_attr, _find_type, _find_xmlns_prefix, _get_element_nsuri_name, _get_idstr, _Node, EvaluateException, UNICODE_ENCODING, _valid_encoding, ParseException
- from ZSI.wstools.Namespaces import SCHEMA, SOAP
- from ZSI.wstools.Utility import SplitQName
- from ZSI.wstools.c14n import Canonicalize
- from ZSI.wstools.logging import getLogger as _GetLogger
- import re
- import types
- import time
- import copy
- from base64 import decodestring as b64decode, encodestring as b64encode
- from urllib import unquote as urldecode, quote as urlencode
- from binascii import unhexlify as hexdecode, hexlify as hexencode
-
- try:
- from cStringIO import StringIO
- except ImportError:
- from StringIO import StringIO
-
-
- _is_xsd_or_soap_ns = lambda ns: ns in [
- SCHEMA.XSD3,
- SOAP.ENC,
- SCHEMA.XSD1,
- SCHEMA.XSD2]
-
- _find_nil = lambda E: if not _find_xsi_attr(E, 'null'):
- pass_find_xsi_attr(E, 'nil')
-
- def _get_xsitype(pyclass):
- if hasattr(pyclass, 'type') and type(pyclass.type) in _seqtypes:
- return pyclass.type
- elif hasattr(pyclass, 'type') and hasattr(pyclass, 'schema'):
- return (pyclass.schema, pyclass.type)
-
- return (None, None)
-
- Nilled = None
- UNBOUNDED = 'unbounded'
-
- class TypeCode:
- tag = None
- type = (None, None)
- typechecks = True
- attribute_typecode_dict = None
- logger = _GetLogger('ZSI.TC.TypeCode')
-
- def __init__(self, pname = None, aname = None, minOccurs = 1, maxOccurs = 1, nillable = False, typed = True, unique = True, pyclass = None, attrs_aname = '_attrs', **kw):
- if type(pname) in _seqtypes:
- (self.nspname, self.pname) = pname
- else:
- self.nspname = None
- self.pname = pname
- if self.pname:
- self.pname = str(self.pname).split(':')[-1]
-
- if not aname:
- pass
- self.aname = self.pname
- self.minOccurs = minOccurs
- self.maxOccurs = maxOccurs
- self.nillable = nillable
- self.typed = typed
- self.unique = unique
- self.attrs_aname = attrs_aname
- self.pyclass = pyclass
- encoded = kw.get('encoded')
- if encoded is not None:
- self.nspname = kw['encoded']
-
-
-
- def parse(self, elt, ps):
- raise EvaluateException('Unimplemented evaluation', ps.Backtrace(elt))
-
-
- def serialize(self, elt, sw, pyobj, name = None, orig = None, **kw):
- raise EvaluateException('Unimplemented evaluation', sw.Backtrace(elt))
-
-
- def text_to_data(self, text, elt, ps):
- raise EvaluateException('Unimplemented evaluation', ps.Backtrace(elt))
-
-
- def serialize_as_nil(self, elt):
- elt.setAttributeNS(SCHEMA.XSI3, 'nil', '1')
-
-
- def SimpleHREF(self, elt, ps, tag):
- if len(_children(elt)):
- return elt
-
- href = _find_href(elt)
- if not href:
- if self.minOccurs is 0:
- return None
-
- raise EvaluateException('Required' + tag + ' missing', ps.Backtrace(elt))
-
- return ps.FindLocalHREF(href, elt, 0)
-
-
- def get_parse_and_errorlist(self):
- d = self.__class__.__dict__
- parselist = d.get('parselist')
- errorlist = d.get('errorlist')
- if parselist and not errorlist:
- errorlist = []
- for t in parselist:
- if t[1] not in errorlist:
- errorlist.append(t[1])
- continue
-
- errorlist = ' or '.join(errorlist)
- d['errorlist'] = errorlist
-
- return (parselist, errorlist)
-
-
- def checkname(self, elt, ps):
- (parselist, errorlist) = self.get_parse_and_errorlist()
- (ns, name) = _get_element_nsuri_name(elt)
- if ns == SOAP.ENC:
- if parselist and (None, name) not in parselist and (ns, name) not in parselist:
- raise EvaluateException('Element mismatch (got %s wanted %s) (SOAP encoding namespace)' % (name, errorlist), ps.Backtrace(elt))
-
- return (ns, name)
-
- if self.nspname and ns != self.nspname:
- raise EvaluateException('Element NS mismatch (got %s wanted %s)' % (ns, self.nspname), ps.Backtrace(elt))
-
- if self.pname and name != self.pname:
- raise EvaluateException('Element Name mismatch (got %s wanted %s)' % (name, self.pname), ps.Backtrace(elt))
-
- return self.checktype(elt, ps)
-
-
- def checktype(self, elt, ps):
- typeName = _find_type(elt)
- if typeName is None or typeName == '':
- return (None, None)
-
- (prefix, typeName) = SplitQName(typeName)
- uri = ps.GetElementNSdict(elt).get(prefix)
- if uri is None:
- uri = self.nspname
-
- (parselist, errorlist) = self.get_parse_and_errorlist()
- if (not parselist and (uri, typeName) in parselist or _is_xsd_or_soap_ns(uri)) and (None, typeName) in parselist:
- return (uri, typeName)
-
- raise EvaluateException('Type mismatch (%s namespace) (got %s wanted %s)' % (uri, typeName, errorlist), ps.Backtrace(elt))
-
-
- def name_match(self, elt):
- if self.pname == elt.localName:
- pass
- return self.nspname in [
- None,
- '',
- elt.namespaceURI]
-
-
- def nilled(self, elt, ps):
- if _find_nil(elt) not in ('true', '1'):
- return False
-
- if self.nillable is False:
- raise EvaluateException('Non-nillable element is NIL', ps.Backtrace(elt))
-
- return True
-
-
- def simple_value(self, elt, ps, mixed = False):
- if not _valid_encoding(elt):
- raise EvaluateException('Invalid encoding', ps.Backtrace(elt))
-
- c = _children(elt)
- if mixed is False:
- if len(c) == 0:
- raise EvaluateException('Value missing', ps.Backtrace(elt))
-
- for c_elt in c:
- if c_elt.nodeType == _Node.ELEMENT_NODE:
- raise EvaluateException('Sub-elements in value', ps.Backtrace(c_elt))
- continue
-
-
- return [](_[1])
-
-
- def parse_attributes(self, elt, ps):
- if self.attribute_typecode_dict is None:
- return None
-
- attributes = { }
- for attr, what in self.attribute_typecode_dict.items():
- namespaceURI = None
- localName = attr
- if type(attr) in _seqtypes:
- (namespaceURI, localName) = attr
-
- value = _find_attrNodeNS(elt, namespaceURI, localName)
- self.logger.debug('Parsed Attribute (%s,%s) -- %s', namespaceURI, localName, value)
- if value is None:
- continue
-
- attributes[attr] = what.text_to_data(value, elt, ps)
-
- return attributes
-
-
- def set_attributes(self, el, pyobj):
- if not hasattr(pyobj, self.attrs_aname):
- return None
-
- if not isinstance(getattr(pyobj, self.attrs_aname), dict):
- raise TypeError, 'pyobj.%s must be a dictionary of names and values' % self.attrs_aname
-
- for attr, value in getattr(pyobj, self.attrs_aname).items():
- namespaceURI = None
- localName = attr
- if type(attr) in _seqtypes:
- (namespaceURI, localName) = attr
-
- what = None
- if getattr(self, 'attribute_typecode_dict', None) is not None:
- what = self.attribute_typecode_dict.get(attr)
- if what is None and namespaceURI is None:
- what = self.attribute_typecode_dict.get(localName)
-
-
- if hasattr(value, 'typecode') and not isinstance(what, AnyType):
- if what is not None and not isinstance(value.typecode, what):
- raise EvaluateException, 'self-describing attribute must subclass %s' % what.__class__
-
- what = value.typecode
-
- self.logger.debug('attribute create -- %s', value)
- if isinstance(what, QName):
- what.set_prefix(el, value)
-
- if what is None:
- value = str(value)
- else:
- value = what.get_formatted_content(value)
- el.setAttributeNS(namespaceURI, localName, value)
-
-
-
- def set_attribute_xsi_type(self, el, **kw):
- if kw.get('typed', self.typed):
- (namespaceURI, typeName) = kw.get('type', _get_xsitype(self))
- if namespaceURI and typeName:
- self.logger.debug('attribute: (%s, %s)', namespaceURI, typeName)
- el.setAttributeType(namespaceURI, typeName)
-
-
-
-
- def set_attribute_href(self, el, objid):
- el.setAttributeNS(None, 'href', '#%s' % objid)
-
-
- def set_attribute_id(self, el, objid):
- if self.unique is False:
- el.setAttributeNS(None, 'id', '%s' % objid)
-
-
-
- def get_name(self, name, objid):
- if type(name) is tuple:
- return name
-
- ns = self.nspname
- if not name and self.pname:
- pass
- n = 'E' + objid
- return (ns, n)
-
-
- def has_attributes(self):
- if self.attribute_typecode_dict is None:
- return False
-
- return len(self.attribute_typecode_dict) > 0
-
-
-
- class SimpleType(TypeCode):
- empty_content = None
- logger = _GetLogger('ZSI.TC.SimpleType')
-
- def parse(self, elt, ps):
- self.checkname(elt, ps)
- if len(_children(elt)) == 0:
- href = _find_href(elt)
- if not href:
- if self.nilled(elt, ps) is False:
- return self.text_to_data(self.empty_content, elt, ps)
-
- if self.nillable is True:
- return Nilled
-
- raise EvaluateException('Requiredstring missing', ps.Backtrace(elt))
-
- if href[0] != '#':
- return ps.ResolveHREF(href, self)
-
- elt = ps.FindLocalHREF(href, elt)
- self.checktype(elt, ps)
- if self.nilled(elt, ps):
- return Nilled
-
- if len(_children(elt)) == 0:
- v = self.empty_content
- else:
- v = self.simple_value(elt, ps)
- else:
- v = self.simple_value(elt, ps)
- pyobj = self.text_to_data(v, elt, ps)
- if self.attribute_typecode_dict is not None:
- attributes = self.parse_attributes(elt, ps)
- if attributes:
- setattr(pyobj, self.attrs_aname, attributes)
-
-
- return pyobj
-
-
- def get_formatted_content(self, pyobj):
- raise NotImplementedError, 'method get_formatted_content is not implemented'
-
-
- def serialize_text_node(self, elt, sw, pyobj):
- textNode = None
- if pyobj is not None:
- text = self.get_formatted_content(pyobj)
- if type(text) not in _stringtypes:
- raise TypeError, 'pyobj must be a formatted string'
-
- textNode = elt.createAppendTextNode(text)
-
- return textNode
-
-
- def serialize(self, elt, sw, pyobj, name = None, orig = None, **kw):
- objid = _get_idstr(pyobj)
- (ns, n) = self.get_name(name, objid)
- el = elt.createAppendElement(ns, n)
- if self.nillable is True and pyobj is Nilled:
- self.serialize_as_nil(el)
- return None
-
- self.set_attributes(el, pyobj)
- if not self.unique:
- pass
- unique = kw.get('unique', False)
- if unique is False:
- if not orig:
- pass
- if sw.Known(pyobj):
- self.set_attribute_href(el, objid)
- return None
-
- if kw.get('typed', self.typed) is True:
- self.set_attribute_xsi_type(el, **kw)
-
- if self.unique is False:
- self.set_attribute_id(el, objid)
-
- self.serialize_text_node(el, sw, pyobj)
- return el
-
-
-
- class Any(TypeCode):
- logger = _GetLogger('ZSI.TC.Any')
- parsemap = { }
- serialmap = { }
-
- def __init__(self, pname = None, aslist = False, minOccurs = 0, unique = False, **kw):
- TypeCode.__init__(self, pname, minOccurs = minOccurs, unique = unique, **kw)
- self.aslist = aslist
- self.kwargs = dict(aslist = aslist, unique = unique)
- self.kwargs.update(kw)
-
-
- def listify(self, v):
- return dict(v)
-
-
- def parse_into_dict_or_list(self, elt, ps):
- c = _child_elements(elt)
- count = len(c)
- v = []
- if count == 0:
- href = _find_href(elt)
- if not href:
- return v
-
- elt = ps.FindLocalHREF(href, elt)
- self.checktype(elt, ps)
- c = _child_elements(elt)
- count = len(c)
- if count == 0:
- return self.listify(v)
-
-
- if self.nilled(elt, ps):
- return Nilled
-
- for c_elt in c:
- v.append((str(c_elt.localName), self.__class__(**self.kwargs).parse(c_elt, ps)))
-
- return self.listify(v)
-
-
- def parse(self, elt, ps):
- (ns, type) = self.checkname(elt, ps)
- if not type and self.nilled(elt, ps):
- return Nilled
-
- if len(_children(elt)) == 0:
- href = _find_href(elt)
- if not href:
- if self.minOccurs < 1:
- if _is_xsd_or_soap_ns(ns):
- parser = Any.parsemap.get((None, type))
- if parser:
- return parser.parse(elt, ps)
-
-
- if not (ns, type) == (SOAP.ENC, 'Array'):
- if not _find_arraytype(elt):
- pass
- if ''.endswith('[0]'):
- return []
-
- return None
-
- raise EvaluateException('Required Any missing', ps.Backtrace(elt))
-
- elt = ps.FindLocalHREF(href, elt)
- (ns, type) = self.checktype(elt, ps)
-
- if not type and elt.namespaceURI == SOAP.ENC:
- ns = SOAP.ENC
- type = elt.localName
-
- if not type or (ns, type) == (SOAP.ENC, 'Array'):
- if len(_child_elements(elt)) == 0:
- return self.simple_value(elt, ps)
-
- return self.parse_into_dict_or_list(elt, ps)
-
- parser = Any.parsemap.get((ns, type))
- if not parser and _is_xsd_or_soap_ns(ns):
- parser = Any.parsemap.get((None, type))
-
- if not parser:
- raise EvaluateException("Any can't parse element", ps.Backtrace(elt))
-
- return parser.parse(elt, ps)
-
-
- def get_formatted_content(self, pyobj):
- tc = type(pyobj)
- if tc == types.InstanceType:
- tc = pyobj.__class__
- if hasattr(pyobj, 'typecode'):
- serializer = pyobj.typecode
- else:
- serializer = Any.serialmap.get(tc)
- if not serializer:
- tc = (types.ClassType, pyobj.__class__.__name__)
- serializer = Any.serialmap.get(tc)
-
- else:
- serializer = Any.serialmap.get(tc)
- if not serializer and isinstance(pyobj, time.struct_time):
- gDateTime = gDateTime
- import ZSI.TCtimes
- serializer = gDateTime()
-
- if serializer:
- return serializer.get_formatted_content(pyobj)
-
- raise EvaluateException, 'Failed to find serializer for pyobj %s' % pyobj
-
-
- def serialize(self, elt, sw, pyobj, name = None, **kw):
- if hasattr(pyobj, 'typecode') and pyobj.typecode is not self:
- pyobj.typecode.serialize(elt, sw, pyobj, **kw)
- return None
-
- objid = _get_idstr(pyobj)
- (ns, n) = self.get_name(name, objid)
- kw.setdefault('typed', self.typed)
- tc = type(pyobj)
- self.logger.debug('Any serialize -- %s', tc)
- if tc in _seqtypes:
- if self.aslist:
- array = elt.createAppendElement(ns, n)
- array.setAttributeType(SOAP.ENC, 'Array')
- array.setAttributeNS(self.nspname, 'SOAP-ENC:arrayType', 'xsd:anyType[' + str(len(pyobj)) + ']')
- for o in pyobj:
- serializer = getattr(o, 'typecode', Any(**self.kwargs))
- serializer.serialize(array, sw, o, name = 'element', **kw)
-
- else:
- struct = elt.createAppendElement(ns, n)
- for o in pyobj:
- serializer = getattr(o, 'typecode', Any(**self.kwargs))
- serializer.serialize(struct, sw, o, **kw)
-
- return None
-
- kw['name'] = (ns, n)
- if tc == types.DictType:
- el = elt.createAppendElement(ns, n)
- parentNspname = self.nspname
- self.nspname = None
- for o, m in pyobj.items():
- if type(o) != types.StringType and type(o) != types.UnicodeType:
- raise Exception, 'Dictionary implementation requires keys to be of type string (or unicode).' % pyobj
-
- kw['name'] = o
- kw.setdefault('typed', True)
- self.serialize(el, sw, m, **kw)
-
- self.nspname = parentNspname
- return None
-
- if tc == types.InstanceType:
- tc = pyobj.__class__
- if hasattr(pyobj, 'typecode'):
- serializer = pyobj.typecode
- else:
- serializer = Any.serialmap.get(tc)
- if not serializer:
- tc = (types.ClassType, pyobj.__class__.__name__)
- serializer = Any.serialmap.get(tc)
-
- else:
- serializer = Any.serialmap.get(tc)
- if not serializer and isinstance(pyobj, time.struct_time):
- gDateTime = gDateTime
- import ZSI.TCtimes
- serializer = gDateTime()
-
- if not serializer:
- if pyobj is None:
- self.serialize_as_nil(elt.createAppendElement(ns, n))
- elif type(pyobj) != types.InstanceType:
- raise EvaluateException("Any can't serialize " + repr(pyobj))
- else:
- self.serialize(elt, sw, pyobj.__dict__, **kw)
- else:
- tag = getattr(serializer, 'tag', None)
- if self.pname is not None:
- if 'typed' not in kw:
- kw['typed'] = False
-
- elif tag:
- if tag.find(':') == -1:
- tag = 'SOAP-ENC:' + tag
-
- kw['name'] = tag
- kw['typed'] = False
-
- serializer.unique = self.unique
- serializer.serialize(elt, sw, pyobj, **kw)
-
-
-
- class String(SimpleType):
- empty_content = ''
- parselist = [
- (None, 'string')]
- seriallist = [
- types.StringType,
- types.UnicodeType]
- type = (SCHEMA.XSD3, 'string')
- logger = _GetLogger('ZSI.TC.String')
-
- def __init__(self, pname = None, strip = True, **kw):
- TypeCode.__init__(self, pname, **kw)
- if kw.has_key('resolver'):
- self.resolver = kw['resolver']
-
- self.strip = strip
-
-
- def text_to_data(self, text, elt, ps):
- if self.strip:
- text = text.strip()
-
- if self.pyclass is not None:
- return self.pyclass(text.encode(UNICODE_ENCODING))
-
- return text.encode(UNICODE_ENCODING)
-
-
- def get_formatted_content(self, pyobj):
- if type(pyobj) not in _stringtypes:
- pyobj = str(pyobj)
-
- if type(pyobj) == unicode:
- return pyobj.encode(UNICODE_ENCODING)
-
- return pyobj
-
-
-
- class URI(String):
- parselist = [
- (None, 'anyURI'),
- (SCHEMA.XSD3, 'anyURI')]
- type = (SCHEMA.XSD3, 'anyURI')
- logger = _GetLogger('ZSI.TC.URI')
- reserved = ';/?:@&=+$,'
-
- def text_to_data(self, text, elt, ps):
- return String.text_to_data(self, urldecode(text), elt, ps)
-
-
- def get_formatted_content(self, pyobj):
- u = urlencode(pyobj, self.reserved)
- return String.get_formatted_content(self, u)
-
-
-
- class QName(String):
- parselist = [
- (None, 'QName')]
- type = (SCHEMA.XSD3, 'QName')
- logger = _GetLogger('ZSI.TC.QName')
-
- def __init__(self, pname = None, strip = 1, **kw):
- String.__init__(self, pname, strip, **kw)
- self.prefix = None
-
-
- def get_formatted_content(self, pyobj):
- value = pyobj
- if isinstance(pyobj, tuple):
- (namespaceURI, localName) = pyobj
- if self.prefix is not None:
- value = '%s:%s' % (self.prefix, localName)
-
-
- return String.get_formatted_content(self, value)
-
-
- def set_prefix(self, elt, pyobj):
- if isinstance(pyobj, tuple):
- (namespaceURI, localName) = pyobj
- self.prefix = elt.getPrefix(namespaceURI)
-
-
-
- def text_to_data(self, text, elt, ps):
- (prefix, localName) = SplitQName(text)
- nsdict = ps.GetElementNSdict(elt)
- if not prefix:
- pass
- prefix = ''
-
- try:
- namespaceURI = nsdict[prefix]
- except KeyError:
- ex = None
- raise EvaluateException('cannot resolve prefix(%s)' % prefix, ps.Backtrace(elt))
-
- v = (namespaceURI, localName)
- if self.pyclass is not None:
- return self.pyclass(v)
-
- return v
-
-
- def serialize_text_node(self, elt, sw, pyobj):
- self.set_prefix(elt, pyobj)
- return String.serialize_text_node(self, elt, sw, pyobj)
-
-
-
- class Token(String):
- parselist = [
- (None, 'token')]
- type = (SCHEMA.XSD3, 'token')
- logger = _GetLogger('ZSI.TC.Token')
-
-
- class Base64String(String):
- parselist = [
- (None, 'base64Binary'),
- (SOAP.ENC, 'base64')]
- type = (SOAP.ENC, 'base64')
- logger = _GetLogger('ZSI.TC.Base64String')
-
- def text_to_data(self, text, elt, ps):
- val = b64decode(text.replace(' ', '').replace('\n', '').replace('\r', ''))
- if self.pyclass is not None:
- return self.pyclass(val)
-
- return val
-
-
- def get_formatted_content(self, pyobj):
- pyobj = '\n' + b64encode(pyobj)
- return String.get_formatted_content(self, pyobj)
-
-
-
- class Base64Binary(String):
- parselist = [
- (None, 'base64Binary')]
- type = (SCHEMA.XSD3, 'base64Binary')
- logger = _GetLogger('ZSI.TC.Base64Binary')
-
- def text_to_data(self, text, elt, ps):
- val = b64decode(text)
- if self.pyclass is not None:
- return self.pyclass(val)
-
- return val
-
-
- def get_formatted_content(self, pyobj):
- pyobj = b64encode(pyobj).strip()
- return pyobj
-
-
-
- class HexBinaryString(String):
- parselist = [
- (None, 'hexBinary')]
- type = (SCHEMA.XSD3, 'hexBinary')
- logger = _GetLogger('ZSI.TC.HexBinaryString')
-
- def text_to_data(self, text, elt, ps):
- val = hexdecode(text)
- if self.pyclass is not None:
- return self.pyclass(val)
-
- return val
-
-
- def get_formatted_content(self, pyobj):
- pyobj = hexencode(pyobj).upper()
- return String.get_formatted_content(self, pyobj)
-
-
-
- class XMLString(String):
- logger = _GetLogger('ZSI.TC.XMLString')
-
- def __init__(self, pname = None, readerclass = None, **kw):
- String.__init__(self, pname, **kw)
- self.readerclass = readerclass
-
-
- def parse(self, elt, ps):
- if not self.readerclass:
- PyExpat = PyExpat
- import xml.dom.ext.reader
- self.readerclass = PyExpat.Reader
-
- v = String.parse(self, elt, ps)
- return self.readerclass().fromString(v)
-
-
- def get_formatted_content(self, pyobj):
- return String.get_formatted_content(self, pyobj)
-
-
-
- class Enumeration(String):
- logger = _GetLogger('ZSI.TC.Enumeration')
-
- def __init__(self, choices, pname = None, **kw):
- String.__init__(self, pname, **kw)
- t = type(choices)
- if t in _seqtypes:
- self.choices = tuple(choices)
- elif TypeCode.typechecks:
- raise TypeError('Enumeration choices must be list or sequence, not ' + str(t))
-
- if TypeCode.typechecks:
- for c in self.choices:
- if type(c) not in _stringtypes:
- raise TypeError('Enumeration choice ' + str(c) + ' is not a string')
- continue
-
-
-
-
- def parse(self, elt, ps):
- val = String.parse(self, elt, ps)
- if val not in self.choices:
- raise EvaluateException('Value not in enumeration list', ps.Backtrace(elt))
-
- return val
-
-
- def serialize(self, elt, sw, pyobj, name = None, orig = None, **kw):
- if pyobj not in self.choices:
- raise EvaluateException('Value not in enumeration list', sw.Backtrace(elt))
-
- String.serialize(self, elt, sw, pyobj, name = name, orig = orig, **kw)
-
-
- _ignored = []
-
- class Integer(SimpleType):
- ranges = {
- 'unsignedByte': (0, 255),
- 'unsignedShort': (0, 65535),
- 'unsignedInt': (0, 0xFFFFFFFFL),
- 'unsignedLong': (0, 0xFFFFFFFFFFFFFFFFL),
- 'byte': (-128, 127),
- 'short': (-32768, 32767),
- 'int': (-0x80000000L, 2147483647),
- 'long': (-0x8000000000000000L, 0x7FFFFFFFFFFFFFFFL),
- 'negativeInteger': (_ignored, -1),
- 'nonPositiveInteger': (_ignored, 0),
- 'nonNegativeInteger': (0, _ignored),
- 'positiveInteger': (1, _ignored),
- 'integer': (_ignored, _ignored) }
- parselist = [ (None, k) for k in ranges.keys() ]
- seriallist = [
- types.IntType,
- types.LongType]
- logger = _GetLogger('ZSI.TC.Integer')
-
- def __init__(self, pname = None, format = '%d', **kw):
- TypeCode.__init__(self, pname, **kw)
- self.format = format
-
-
- def text_to_data(self, text, elt, ps):
- if self.pyclass is not None:
- v = self.pyclass(text)
- else:
-
- try:
- v = int(text)
- except:
-
- try:
- v = long(text)
- raise EvaluateException('Unparseable integer', ps.Backtrace(elt))
-
-
- return v
-
-
- def parse(self, elt, ps):
- (ns, type) = self.checkname(elt, ps)
- if self.nilled(elt, ps):
- return Nilled
-
- elt = self.SimpleHREF(elt, ps, 'integer')
- if not elt:
- return None
-
- if type is None:
- type = self.type[1]
- elif self.type[1] is not None and type != self.type[1]:
- raise EvaluateException('Integer type mismatch; got %s wanted %s' % (type, self.type[1]), ps.Backtrace(elt))
-
- v = self.simple_value(elt, ps)
- v = self.text_to_data(v, elt, ps)
- (rmin, rmax) = Integer.ranges.get(type, (_ignored, _ignored))
- if rmin != _ignored and v < rmin:
- raise EvaluateException('Underflow, less than ' + repr(rmin), ps.Backtrace(elt))
-
- if rmax != _ignored and v > rmax:
- raise EvaluateException('Overflow, greater than ' + repr(rmax), ps.Backtrace(elt))
-
- return v
-
-
- def get_formatted_content(self, pyobj):
- return self.format % pyobj
-
-
-
- def _make_inf():
- x = 2
- x2 = x * x
- i = 0
- while i < 100 and x != x2:
- x = x2
- x2 = x * x
- i = i + 1
- if x != x2:
- raise ValueError("This machine's floats go on forever!")
-
- return x
-
- _magicnums = { }
-
- try:
- _magicnums['INF'] = float('INF')
- _magicnums['-INF'] = float('-INF')
- except:
- _magicnums['INF'] = _make_inf()
- _magicnums['-INF'] = -_magicnums['INF']
-
-
- def isnan(x):
- if x * 1 < x:
- return 1
-
- if 1 == x:
- pass
- return x == 2
-
-
- class Decimal(SimpleType):
- parselist = [
- (None, 'decimal'),
- (None, 'float'),
- (None, 'double')]
- seriallist = _floattypes
- type = None
- ranges = {
- 'float': (7.00649e-46, -3.40282e+38, 3.40282e+38),
- 'double': (0, -1.79769e+308, 1.79769e+308) }
- zeropat = re.compile('[1-9]')
- logger = _GetLogger('ZSI.TC.Decimal')
-
- def __init__(self, pname = None, format = '%f', **kw):
- TypeCode.__init__(self, pname, **kw)
- self.format = format
-
-
- def text_to_data(self, text, elt, ps):
- v = text
- if self.pyclass is not None:
- return self.pyclass(v)
-
- m = _magicnums.get(v)
- if m:
- return m
-
-
- try:
- return float(v)
- except:
- raise EvaluateException('Unparseable floating point number', ps.Backtrace(elt))
-
-
-
- def parse(self, elt, ps):
- (ns, type) = self.checkname(elt, ps)
- elt = self.SimpleHREF(elt, ps, 'floating-point')
- if not elt:
- return None
-
- tag = getattr(self.__class__, 'type')
- if tag:
- if type is None:
- type = tag
- elif tag != (ns, type):
- raise EvaluateException('Floating point type mismatch; got (%s,%s) wanted %s' % (ns, type, tag), ps.Backtrace(elt))
-
-
- if self.nilled(elt, ps):
- return Nilled
-
- v = self.simple_value(elt, ps)
-
- try:
- fp = self.text_to_data(v, elt, ps)
- except EvaluateException:
- ex = None
- ex.args.append(ps.Backtrace(elt))
- raise ex
-
- m = _magicnums.get(v)
- if m:
- return m
-
- if str(fp).lower() in ('inf', '-inf', 'nan', '-nan'):
- raise EvaluateException('Floating point number parsed as "' + str(fp) + '"', ps.Backtrace(elt))
-
- if fp == 0 and Decimal.zeropat.search(v):
- raise EvaluateException('Floating point number parsed as zero', ps.Backtrace(elt))
-
- (rtiny, rneg, rpos) = Decimal.ranges.get(type, (None, None, None))
- if rneg and fp < 0 and fp < rneg:
- raise EvaluateException('Negative underflow', ps.Backtrace(elt))
-
- if rtiny and fp > 0 and fp < rtiny:
- raise EvaluateException('Positive underflow', ps.Backtrace(elt))
-
- if rpos and fp > 0 and fp > rpos:
- raise EvaluateException('Overflow', ps.Backtrace(elt))
-
- return fp
-
-
- def get_formatted_content(self, pyobj):
- if pyobj == _magicnums['INF']:
- return 'INF'
- elif pyobj == _magicnums['-INF']:
- return '-INF'
- elif isnan(pyobj):
- return 'NaN'
- else:
- return self.format % pyobj
-
-
-
- class Boolean(SimpleType):
- parselist = [
- (None, 'boolean')]
- seriallist = [
- bool]
- type = (SCHEMA.XSD3, 'boolean')
- logger = _GetLogger('ZSI.TC.Boolean')
-
- def text_to_data(self, text, elt, ps):
- v = text
- if v == 'false':
- if self.pyclass is None:
- return False
-
- return self.pyclass(False)
-
- if v == 'true':
- if self.pyclass is None:
- return True
-
- return self.pyclass(True)
-
-
- try:
- v = int(v)
- except:
-
- try:
- v = long(v)
- raise EvaluateException('Unparseable boolean', ps.Backtrace(elt))
-
-
- if v:
- if self.pyclass is None:
- return True
-
- return self.pyclass(True)
-
- if self.pyclass is None:
- return False
-
- return self.pyclass(False)
-
-
- def parse(self, elt, ps):
- self.checkname(elt, ps)
- elt = self.SimpleHREF(elt, ps, 'boolean')
- if not elt:
- return None
-
- if self.nilled(elt, ps):
- return Nilled
-
- v = self.simple_value(elt, ps).lower()
- return self.text_to_data(v, elt, ps)
-
-
- def get_formatted_content(self, pyobj):
- if pyobj:
- return 'true'
-
- return 'false'
-
-
-
- class XML(TypeCode):
- copyit = 0
- logger = _GetLogger('ZSI.TC.XML')
-
- def __init__(self, pname = None, comments = 0, inline = 0, wrapped = True, **kw):
- TypeCode.__init__(self, pname, **kw)
- self.comments = comments
- self.inline = inline
- if kw.has_key('resolver'):
- self.resolver = kw['resolver']
-
- self.wrapped = wrapped
- self.copyit = kw.get('copyit', XML.copyit)
-
-
- def parse(self, elt, ps):
- if self.wrapped is False:
- return elt
-
- c = _child_elements(elt)
- if not c:
- href = _find_href(elt)
- if not href:
- if self.minOccurs == 0:
- return None
-
- raise EvaluateException('Embedded XML document missing', ps.Backtrace(elt))
-
- if href[0] != '#':
- return ps.ResolveHREF(href, self)
-
- elt = ps.FindLocalHREF(href, elt)
- c = _child_elements(elt)
-
- if _find_encstyle(elt) != '':
- pass
-
- if len(c) != 1:
- raise EvaluateException('Embedded XML has more than one child', ps.Backtrace(elt))
-
- if self.copyit:
- return c[0].cloneNode(1)
-
- return c[0]
-
-
- def serialize(self, elt, sw, pyobj, name = None, unsuppressedPrefixes = [], **kw):
- objid = _get_idstr(pyobj)
- (ns, n) = self.get_name(name, objid)
- xmlelt = elt
- if self.wrapped:
- xmlelt = elt.createAppendElement(ns, n)
-
- self.cb(xmlelt, sw, pyobj, unsuppressedPrefixes)
-
-
- def cb(self, elt, sw, pyobj, unsuppressedPrefixes = []):
- if type(pyobj) in _stringtypes:
- elt.createAppendTextNode(pyobj)
- return None
-
- doc = elt.getDocument()
- node = doc.importNode(pyobj, deep = 1)
- child = elt.node.appendChild(node)
- parent = pyobj.parentNode
- while parent.nodeType == _Node.ELEMENT_NODE:
- for attr in (filter,)((lambda a: if a.name.startswith('xmlns:'):
- passa.name not in child.attributes.keys()), parent.attributes):
- child.setAttributeNode(attr.cloneNode(1))
-
- parent = parent.parentNode
-
-
-
- class AnyType(TypeCode):
- all = '#all'
- other = '#other'
- type = (SCHEMA.XSD3, 'anyType')
- logger = _GetLogger('ZSI.TC.AnyType')
-
- def __init__(self, pname = None, namespaces = [
- '#all'], minOccurs = 1, maxOccurs = 1, strip = 1, **kw):
- TypeCode.__init__(self, pname = pname, minOccurs = minOccurs, maxOccurs = maxOccurs, **kw)
- self.namespaces = namespaces
-
-
- def get_formatted_content(self, pyobj):
- what = getattr(pyobj, 'typecode', Any())
- return what.get_formatted_content(pyobj)
-
-
- def text_to_data(self, text, elt, ps):
- return text
-
-
- def serialize(self, elt, sw, pyobj, **kw):
- (nsuri, typeName) = _get_xsitype(pyobj)
- if self.all not in self.namespaces and nsuri not in self.namespaces:
- raise EvaluateException('<anyType> unsupported use of namespaces "%s"' % self.namespaces)
-
- what = getattr(pyobj, 'typecode', None)
- if what is None:
- what = Any(pname = (self.nspname, self.pname), unique = True, aslist = False)
- kw['typed'] = True
- what.serialize(elt, sw, pyobj, **kw)
- return None
-
- if not self.nspname:
- pass
- if not self.pname:
- pass
- what.serialize(elt, sw, pyobj, name = (what.nspname, what.pname), **kw)
-
-
- def parse(self, elt, ps):
- (nspname, pname) = _get_element_nsuri_name(elt)
- if nspname != self.nspname or pname != self.pname:
- raise EvaluateException('<anyType> instance is (%s,%s) found (%s,%s)' % (self.nspname, self.pname, nspname, pname), ps.Backtrace(elt))
-
- (prefix, typeName) = SplitQName(_find_type(elt))
- namespaceURI = _resolve_prefix(elt, prefix)
- pyclass = GTD(namespaceURI, typeName)
- if not pyclass:
- if _is_xsd_or_soap_ns(namespaceURI):
- pyclass = Any
- elif str(namespaceURI).lower() == str(Apache.Map.type[0]).lower() and str(typeName).lower() == str(Apache.Map.type[1]).lower():
- pyclass = Apache.Map
- else:
- pyobj = Any().parse_into_dict_or_list(elt, ps)
- return pyobj
-
- what = pyclass(pname = (self.nspname, self.pname))
- pyobj = what.parse(elt, ps)
- return pyobj
-
-
-
- class AnyElement(AnyType):
- tag = (SCHEMA.XSD3, 'any')
- logger = _GetLogger('ZSI.TC.AnyElement')
-
- def __init__(self, namespaces = [
- '#all'], pname = None, minOccurs = 1, maxOccurs = 1, strip = 1, processContents = 'strict', **kw):
- if processContents not in ('lax', 'skip', 'strict'):
- raise ValueError('processContents(%s) must be lax, skip, or strict')
-
- self.processContents = processContents
- AnyType.__init__(self, namespaces = namespaces, pname = pname, minOccurs = minOccurs, maxOccurs = maxOccurs, strip = strip, **kw)
-
-
- def serialize(self, elt, sw, pyobj, **kw):
- if isinstance(pyobj, TypeCode):
- raise TypeError, 'pyobj is a typecode instance.'
-
- what = getattr(pyobj, 'typecode', None)
- if what is not None and type(pyobj) is types.InstanceType:
- tc = pyobj.__class__
- what = Any.serialmap.get(tc)
- if not what:
- tc = (types.ClassType, pyobj.__class__.__name__)
- what = Any.serialmap.get(tc)
-
-
- self.logger.debug('processContents: %s', self.processContents)
- if what is None:
- what = Any(pname = (self.nspname, self.pname))
-
- self.logger.debug('serialize with %s', what.__class__.__name__)
- what.serialize(elt, sw, pyobj, **kw)
-
-
- def parse(self, elt, ps):
- skip = self.processContents == 'skip'
- (nspname, pname) = _get_element_nsuri_name(elt)
- what = GED(nspname, pname)
- if not skip and what is not None:
- pyobj = what.parse(elt, ps)
-
- try:
- pyobj.typecode = what
- except AttributeError:
- ex = None
- pyobj = WrapImmutable(pyobj, what)
-
- return pyobj
-
- (prefix, typeName) = SplitQName(_find_type(elt))
- if not skip and typeName:
- if not prefix:
- pass
- namespaceURI = _resolve_prefix(elt, 'xmlns')
- if not GTD(namespaceURI, typeName):
- pass
- pyclass = Any
- what = pyclass(pname = (nspname, pname))
- pyobj = what.parse(elt, ps)
-
- try:
- pyobj.typecode = what
- except AttributeError:
- ex = None
- pyobj = WrapImmutable(pyobj, what)
-
- what.typed = True
- return pyobj
-
- if skip:
- what = XML(pname = (nspname, pname), wrapped = False)
- elif self.processContents == 'lax':
- what = Any(pname = (nspname, pname), unique = True)
- else:
- what = Any(pname = (nspname, pname), unique = True)
-
- try:
- pyobj = what.parse(elt, ps)
- except EvaluateException:
- ex = None
- self.logger.debug('error parsing: %s' % str(ex))
- if len(_children(elt)) != 0:
- self.logger.debug('parse <any>, return as dict')
- return Any(aslist = False).parse_into_dict_or_list(elt, ps)
-
- self.logger.debug('Give up, parse (%s,%s) as a String', what.nspname, what.pname)
- what = String(pname = (nspname, pname), typed = False)
- return WrapImmutable(what.parse(elt, ps), what)
-
- if pyobj is None:
- return None
-
- if type(pyobj) is dict:
- return pyobj
-
-
- try:
- pyobj.typecode = what
- except AttributeError:
- pyobj = WrapImmutable(pyobj, what)
-
- return pyobj
-
-
-
- class Union(SimpleType):
- memberTypes = None
- logger = _GetLogger('ZSI.TC.Union')
-
- def __init__(self, pname = None, minOccurs = 1, maxOccurs = 1, **kw):
- SimpleType.__init__(self, pname = pname, minOccurs = minOccurs, maxOccurs = maxOccurs, **kw)
- self.memberTypeCodes = []
-
-
- def setMemberTypeCodes(self):
- if len(self.memberTypeCodes) > 0:
- return None
-
- if self.__class__.memberTypes is None:
- raise EvaluateException, 'uninitialized class variable memberTypes [(namespace,name),]'
-
- for nsuri, name in self.__class__.memberTypes:
- tcclass = GTD(nsuri, name)
- if tcclass is None:
- if not Any.parsemap.get((nsuri, name)):
- pass
- tc = Any.parsemap.get((None, name))
- typecode = tc.__class__(pname = (self.nspname, self.pname))
- else:
- typecode = tcclass(pname = (self.nspname, self.pname))
- if typecode is None:
- raise EvaluateException, 'Typecode class for Union memberType (%s,%s) is missing' % (nsuri, name)
-
- if isinstance(typecode, Struct):
- raise EvaluateException, 'Illegal: Union memberType (%s,%s) is complexType' % (nsuri, name)
-
- self.memberTypeCodes.append(typecode)
-
-
-
- def parse(self, elt, ps, **kw):
- self.setMemberTypeCodes()
- (nsuri, typeName) = self.checkname(elt, ps)
- for indx in range(len(self.memberTypeCodes)):
- typecode = self.memberTypeCodes[indx]
-
- try:
- pyobj = typecode.parse(elt, ps)
- except ParseException:
- ex = None
- continue
- except Exception:
- ex = None
- continue
-
- if indx > 0:
- self.memberTypeCodes.remove(typecode)
- self.memberTypeCodes.insert(0, typecode)
-
- else:
- raise
- return pyobj
-
-
- def get_formatted_content(self, pyobj, **kw):
- self.setMemberTypeCodes()
- for indx in range(len(self.memberTypeCodes)):
- typecode = self.memberTypeCodes[indx]
-
- try:
- content = typecode.get_formatted_content(copy.copy(pyobj))
- except (ParseException, TypeError):
- pass
-
- if indx > 0:
- self.memberTypeCodes.remove(typecode)
- self.memberTypeCodes.insert(0, typecode)
- continue
- else:
- raise
- return content
-
-
-
- class List(SimpleType):
- itemType = None
- logger = _GetLogger('ZSI.TC.List')
-
- def __init__(self, pname = None, itemType = None, **kw):
- SimpleType.__init__(self, pname = pname, **kw)
- if not itemType:
- pass
- self.itemType = self.itemType
- self.itemTypeCode = self.itemType
- itemTypeCode = None
- if type(self.itemTypeCode) in _seqtypes:
- (namespaceURI, name) = self.itemTypeCode
-
- try:
- itemTypeCode = GTD(*self.itemType)(None)
- except:
- if _is_xsd_or_soap_ns(namespaceURI) is False:
- raise
-
- for pyclass in TYPES:
- if pyclass.type == self.itemTypeCode:
- itemTypeCode = pyclass(None)
- break
- continue
- if pyclass.type[1] == name:
- itemTypeCode = pyclass(None)
- continue
-
- if itemTypeCode is None:
- raise EvaluateException('Failed to locate %s' % str(self.itemTypeCode))
-
-
- if hasattr(itemTypeCode, 'text_to_data') is False:
- raise EvaluateException('TypeCode class %s missing text_to_data method' % itemTypeCode)
-
- self.itemTypeCode = itemTypeCode
-
-
-
- def text_to_data(self, text, elt, ps):
- v = []
- items = text.split()
- for item in items:
- v.append(self.itemTypeCode.text_to_data(item, elt, ps))
-
- if self.pyclass is not None:
- return self.pyclass(v)
-
- return v
-
-
- def parse(self, elt, ps):
- self.checkname(elt, ps)
- if len(_children(elt)) == 0:
- href = _find_href(elt)
- if not href:
- if self.nilled(elt, ps) is False:
- return []
-
- if self.nillable is True:
- return Nilled
-
- raise EvaluateException('Required string missing', ps.Backtrace(elt))
-
- if href[0] != '#':
- return ps.ResolveHREF(href, self)
-
- elt = ps.FindLocalHREF(href, elt)
- self.checktype(elt, ps)
-
- if self.nilled(elt, ps):
- return Nilled
-
- if len(_children(elt)) == 0:
- return []
-
- v = self.simple_value(elt, ps)
- return self.text_to_data(v, elt, ps)
-
-
- def serialize(self, elt, sw, pyobj, name = None, orig = None, **kw):
- if pyobj is not None and type(pyobj) not in _seqtypes:
- raise EvaluateException, 'expecting a list or None'
-
- objid = _get_idstr(pyobj)
- (ns, n) = self.get_name(name, objid)
- el = elt.createAppendElement(ns, n)
- if self.nillable is True and pyobj is None:
- self.serialize_as_nil(el)
- return None
-
- tc = self.itemTypeCode
- s = StringIO()
- sep = ' '
- for item in pyobj:
- s.write(tc.get_formatted_content(item))
- s.write(sep)
-
- el.createAppendTextNode(s.getvalue())
-
-
-
- def RegisterType(C, clobber = 0, *args, **keywords):
- instance = apply(C, args, keywords)
- for t in C.__dict__.get('parselist', []):
- prev = Any.parsemap.get(t)
- if prev:
- if prev.__class__ == C:
- continue
-
- if not clobber:
- raise TypeError(str(C) + ' duplicating parse registration for ' + str(t))
-
-
- Any.parsemap[t] = instance
-
- for t in C.__dict__.get('seriallist', []):
- ti = type(t)
- if ti in [
- types.TypeType,
- types.ClassType]:
- key = t
- elif ti in _stringtypes:
- key = (types.ClassType, t)
- else:
- raise TypeError(str(t) + ' is not a class name')
- prev = Any.serialmap.get(key)
- if prev:
- if prev.__class__ == C:
- continue
-
- if not clobber:
- raise TypeError(str(C) + ' duplicating serial registration for ' + str(t))
-
-
- Any.serialmap[key] = instance
-
-
- from TCnumbers import *
- from TCtimes import *
- from schema import GTD, GED, WrapImmutable
- from TCcompound import *
- from TCapache import *
- (_get_type_definition, _get_global_element_declaration, Wrap) = (GTD, GED, WrapImmutable)
-
- f = lambda x: if type(x) == types.ClassType and issubclass(x, TypeCode):
- passgetattr(x, 'type', None) is not None
- TYPES = filter(f, map((lambda y: eval(y)), dir()))
- if __name__ == '__main__':
- print _copyright
-
-